package local

import (
	"bytes"
	"container/list"
	"encoding/json"
	"fmt"
	"gitee.com/gitee-go/core"
	"gitee.com/gitee-go/core/bean/hbtpBean"
	"gitee.com/gitee-go/core/common"
	"gitee.com/gitee-go/core/runtime"
	"gitee.com/gitee-go/runner-core/runCloud"
	"gitee.com/gitee-go/server/comm"
	comm2 "gitee.com/gitee-go/server/engine/comm"
	"gitee.com/gitee-go/server/engine/dao"
	"gitee.com/gitee-go/utils"
	"gitee.com/gitee-go/utils/ioex"
	hbtp "github.com/mgr9525/HyperByte-Transfer-Protocol"
	"os"
	"path/filepath"
	"runtime/debug"
	"sync"
	"time"
)

/**
运行中的Stage
*/
type TaskStage struct {
	wg    sync.WaitGroup
	stage *runtime.Stage
	jobs  map[string]*comm2.SyncJob
}

/**
具体的流水线运行任务
*/
type taskItem struct {
	prt       *BuildEngine // 构建引擎，task管理
	lk        sync.Mutex
	build     *runtime.Build // 对应一个task
	ctrlend   bool           //手动停止
	ctrlendtm time.Time      // kill 时间

	end   bool //控制状态 0未运行,1运行中，2完成
	bngtm time.Time
	endtm time.Time

	staglk sync.Mutex
	stages map[string]*TaskStage // task下面的stage

	joblk sync.Mutex
	jobls *list.List // ？用于融合日志，按照实际执行顺序排列
}

/**
启动任务
1. 检查
2. 运行
3. 完成,清理
*/
func (c *taskItem) start() {
	c.end = false
	c.bngtm = time.Now()
	c.stages = make(map[string]*TaskStage)
	c.jobls = list.New()

	go func() {
		defer func() {
			if err := recover(); err != nil {
				core.LogPnc.Errorf("local.taskItem runTask:%+v", err)
				core.LogPnc.Errorf("%s", string(debug.Stack()))
				c.build.Error = fmt.Sprintf("%+v", err)
			}
		}()
		defer func() {
			c.end = true
			c.endtm = time.Now()
			c.build.Finished = time.Now()
			core.Log.Debugf("build %s end!!", c.build.Id)
			go c.compareLog()               // build流水线执行完成后,融合所有job的log,方便用户下载
			err := dao.UpdateBuild(c.build) // 更新一遍build,stage,step 的状态为执行完成
			if err != nil {
				core.Log.Errorf("dao.UpdateBuild err:%v", err)
			}
		}()

		c.build.Started = time.Now()
		c.build.Status = common.BUILD_STATUS_PENDING
		if !c.check() {
			c.build.Status = common.BUILD_STATUS_ERROR
			return
		}

		c.build.Status = common.BUILD_STATUS_READY
		/*if err := c.getrepo(); err != nil {
			c.build.Status = common.BUILD_STATUS_ERROR
			c.build.Error = fmt.Sprintf("get repostiory err:%+v", err)
		}*/
		c.build.Status = common.BUILD_STATUS_RUNNING
		// 把所有状态改为等待
		for _, v := range c.build.Cfg.Pipe.Stages {
			v.Status = common.BUILD_STATUS_PENDING
			for _, e := range v.Jobs {
				e.Status = common.BUILD_STATUS_PENDING
			}
		}
		dao.UpdateBuild(c.build)
		for _, v := range c.build.Cfg.Pipe.Stages {
			c.runStage(v)
			if v.Status != common.BUILD_STATUS_OK {
				c.build.Status = v.Status
				return
			}
		}

		c.build.Status = common.BUILD_STATUS_OK
	}()
}

func (c *taskItem) getrepo() error {
	//c.build.RepoInfo.WorkPath
	return nil
}

// 执行stage里的所有任务
func (c *taskItem) runStage(stage *runtime.Stage) {
	defer func() {
		stage.Finished = time.Now()
		dao.UpdateStage(stage)
		core.Log.Debugf("stage %s end!!!", stage.Name)
		if err := recover(); err != nil {
			core.LogPnc.Errorf("local.taskItem runStage:%+v", err)
			core.LogPnc.Errorf("%s", string(debug.Stack()))
		}
	}()
	stage.Started = time.Now()
	stage.Status = common.BUILD_STATUS_RUNNING
	//c.logfile.WriteString(fmt.Sprintf("\n****************Stage+ %s\n", stage.Name))
	dao.UpdateStage(stage)
	c.staglk.Lock()
	stg, ok := c.stages[stage.Name]
	c.staglk.Unlock()
	if !ok {
		stg.stage.Status = common.BUILD_STATUS_ERROR
		stg.stage.Error = fmt.Sprintf("not found stage?:%s", stage.Name)
		return
	}

	// 循环stage的所有任务,并执行
	c.staglk.Lock()
	for _, v := range stage.Jobs {
		stg.wg.Add(1)
		jb := stg.jobs[v.Name]
		go c.runJob(stg, jb) //执行
	}
	c.staglk.Unlock()
	stg.wg.Wait()
	//完成后修改状态
	for _, v := range stg.jobs {
		v.Lock()
		ign := v.Job().ErrIgnore
		status := v.Job().Status
		errs := v.Job().Error
		v.Unlock()
		if !ign && status == common.BUILD_STATUS_ERROR {
			stg.stage.Status = common.BUILD_STATUS_ERROR
			stg.stage.Error = errs
			return
		} else if status == common.BUILD_STATUS_CANCEL {
			stg.stage.Status = common.BUILD_STATUS_CANCEL
			stg.stage.Error = errs
			return
		}
	}

	stage.Status = common.BUILD_STATUS_OK
}

func (c *taskItem) endJob(job *comm2.SyncJob, needpush bool) {
	if err := recover(); err != nil {
		core.LogPnc.Errorf("local.taskItem runJob:%+v", err)
		core.LogPnc.Errorf("%s", string(debug.Stack()))
	}

	if needpush && job.Job().Status == common.BUILD_STATUS_ERROR && job.Job().Error != "" {
		c.pushLogjson(job.Job(), &hbtpBean.CmdLogLineJson{
			Id:   utils.NewXid(),
			Type: hbtpBean.TypeCmdLogLineCmderr,
			//Name: "get depend artifact err",
			Content: job.Job().Error,
		})
	}
	c.pushLogjson(job.Job(), &hbtpBean.CmdLogLineJson{
		Id:   utils.NewXid(),
		Type: hbtpBean.TypeCmdLogLineSysend,
		//Name: "get depend artifact err",
		//Content: job.Job().Error,
	})

	now := time.Now().Format(common.TimeFmt)
	db := comm.DBMain.GetDB()
	// 把所有等待执行的命令和组都改成取消
	db.Exec("update `t_cmd_group` set status=?,finished=? where job_id=? and status=?",
		common.BUILD_STATUS_CANCEL, now, job.Job().Id, common.BUILD_STATUS_PENDING)
	db.Exec("update `t_cmd_line` set status=?,finished=? where job_id=? and status=?",
		common.BUILD_STATUS_CANCEL, now, job.Job().Id, common.BUILD_STATUS_PENDING)
	stats := job.Job().Status
	// 把所有正在执行的命令和组都改成任务状态
	db.Exec("update `t_cmd_group` set status=?,finished=? where job_id=? and status=?",
		stats, now, job.Job().Id, common.BUILD_STATUS_RUNNING)
	db.Exec("update `t_cmd_line` set status=?,finished=? where job_id=? and status=?",
		stats, now, job.Job().Id, common.BUILD_STATUS_RUNNING)
}

// 运行step任务== 并没有实际运行job ，最后的执行者是在runner
func (c *taskItem) runJob(stage *TaskStage, job *comm2.SyncJob) {
	needpush := true // 记录一些额外的不重要的错误
	defer stage.wg.Done()
	defer func() {
		// 修改状态,时间,更新数据库
		job.Lock()
		job.End = true
		job.Job().Finished = time.Now()
		job.Unlock()
		c.endJob(job, needpush)
		core.Log.Debug(fmt.Sprintf("end job(%s:%d):%s", job.Job().Status, job.Job().ExitCode, job.Job().Name))
		if err := recover(); err != nil {
			core.LogPnc.Errorf("local.taskItem runJob:%+v", err)
			core.LogPnc.Errorf("%s", string(debug.Stack()))
			job.Lock()
			job.Job().Status = common.BUILD_STATUS_ERROR
			job.Job().Error = fmt.Sprintf("command run err(code:%d):%v", job.Job().ExitCode, err)
			job.Unlock()
		}
		dao.UpdateJob(job.Job())
	}()

	job.Lock()
	job.End = false
	job.Unlock()
	core.Log.Debug(fmt.Sprintf("start job(%s):%s", job.Job().Status, job.Job().Name))
	if len(job.Job().DependsOn) > 0 { // DependsOn 是当前job需要等待其他job的name切片
		ls := make([]*comm2.SyncJob, 0)
		for _, v := range job.Job().DependsOn { // 检查dependOn正确性(比如不能等待自己)
			if v == "" {
				continue
			}
			e, ok := stage.jobs[v]
			//core.Log.Debugf("job(%s) depend %s(ok:%t)",job.Job().Name,v,ok)
			if !ok {
				job.Lock()
				job.Job().Status = common.BUILD_STATUS_ERROR
				job.Job().Error = fmt.Sprintf("depend on %s not found", v)
				job.Unlock()
				return
			}
			if e.Job().Name == job.Job().Name {
				job.Lock()
				job.Job().Status = common.BUILD_STATUS_ERROR
				job.Job().Error = fmt.Sprintf("depend on %s is your self", e.Job().Name)
				job.Unlock()
				return
			}
			ls = append(ls, e)
		}
		// 循环所有的dependOn,等待需要等待的任务完成再继续执行
		for !ioex.CheckContext(c.prt.ctx) {
			time.Sleep(time.Millisecond * 100)
			if c.ctrlend {
				job.Job().Status = common.BUILD_STATUS_CANCEL
				return
			}
			waitln := len(ls)
			for _, v := range ls {
				v.Lock()
				vStats := v.Job().Status
				v.Unlock()
				if vStats == common.BUILD_STATUS_OK {
					waitln--
				} else if vStats == common.BUILD_STATUS_CANCEL {
					job.Lock()
					job.Job().Status = common.BUILD_STATUS_CANCEL
					job.Unlock()
					return
				} else if vStats == common.BUILD_STATUS_ERROR {
					if v.Job().ErrIgnore {
						waitln--
					} else {
						job.Lock()
						job.Job().Status = common.BUILD_STATUS_ERROR
						job.Job().Error = fmt.Sprintf("depend on %s is err", v.Job().Name)
						job.Unlock()
						return
					}
				}
			}
			if waitln <= 0 {
				break
			}
		}
	}

	c.joblk.Lock()
	c.jobls.PushBack(job.Job()) // 放到已经执行job list的尾部
	c.joblk.Unlock()

	job.Lock()
	job.Job().Status = common.BUILD_STATUS_READY
	job.Job().Started = time.Now()
	job.Unlock()
	dao.UpdateJob(job.Job())

	err := c.prt.putJob(job) // 把 job 放到 runner的待提取运行队列
	if err != nil {
		job.Lock()
		job.Job().Status = common.BUILD_STATUS_ERROR
		job.Job().Error = fmt.Sprintf("command run err:%v", err)
		job.Unlock()
		return
	}

	needpush = false
	// 循环检测runner提交上来的任务状态,如果是完成状态就结束
	for !ioex.CheckContext(c.prt.ctx) {
		job.Lock()
		stats := job.Job().Status
		job.Unlock()
		if common.BuildStatusEnded(stats) {
			break
		}
		if c.ctrlend && time.Since(c.ctrlendtm).Seconds() > 3 {
			job.Lock()
			job.Job().Status = common.BUILD_STATUS_ERROR
			job.Unlock()
			break
		}
		time.Sleep(time.Millisecond * 10)
	}
	job.Lock()
	defer job.Unlock()
	if c.ctrlend && job.Job().Status == common.BUILD_STATUS_ERROR {
		job.Job().Status = common.BUILD_STATUS_CANCEL
	}
}

func (c *taskItem) compareLog() {
	defer func() {
		if err := recover(); err != nil {
			core.LogPnc.Errorf("local.taskItem compareLog:%+v", err)
			core.LogPnc.Errorf("%s", string(debug.Stack()))
		}
	}()

	bdpth := filepath.Join(comm.WorkPath, comm.Build, c.build.Id, "build.log")
	os.RemoveAll(bdpth)
	bdlog, err := os.OpenFile(bdpth, os.O_CREATE|os.O_RDWR, 0644)
	if err != nil {
		core.LogPnc.Errorf("local.taskItem compareLog:%+v", err)
		return
	}
	defer bdlog.Close()
	c.joblk.Lock()
	defer c.joblk.Unlock()
	// 融合每个job的日志
	for e := c.jobls.Front(); e != nil; e = e.Next() {
		job := e.Value.(*runtime.Job)
		err := c.compareJobLog(job, bdlog)
		if err != nil {
			core.Log.Errorf("compareJobLog err:%v", err)
		}
	}

}

// 融合所有job产生的日志
func (c *taskItem) compareJobLog(job *runtime.Job, bdlog *os.File) (rterr error) {
	defer func() {
		if err := recover(); err != nil {
			core.LogPnc.Errorf("local.taskItem compareJobLog:%+v", err)
			core.LogPnc.Errorf("%s", string(debug.Stack()))
			rterr = fmt.Errorf("recover:%v", err)
		}
	}()

	dir := filepath.Join(comm.WorkPath, comm.Build, job.BuildId, comm.Jobs)
	logpth := filepath.Join(dir, fmt.Sprintf("%v.log", job.Id))
	fl, err := os.Open(logpth)
	if err != nil {
		return err
	}
	defer fl.Close()
	bts := make([]byte, 1024*5)
	linebuf := &bytes.Buffer{}
	keys := list.New()
	group := make(map[string][]*hbtpBean.CmdLogLineJson)
	// 读取日志文件,因为日志文件都是按json一行一行的存,所以用换行符为分割,解析返回
	for !ioex.CheckContext(c.prt.ctx) {
		rn, err := fl.Read(bts)
		if rn > 0 {
			for i := 0; i < rn; i++ {
				b := bts[i]
				if linebuf == nil && b == '{' {
					linebuf.Reset()
				}
				if linebuf != nil {
					if b == '\n' {
						e := &hbtpBean.CmdLogLineJson{}
						err := json.Unmarshal(linebuf.Bytes(), e)
						linebuf.Reset()
						if err == nil {
							ls, ok := group[e.Gid]
							if !ok {
								keys.PushBack(e.Gid)
								ls = make([]*hbtpBean.CmdLogLineJson, 0)
							}
							ls = append(ls, e)
							group[e.Gid] = ls
						}
					} else {
						linebuf.WriteByte(b)
					}
				}
			}
		}
		if err != nil {
			break
		}
	}
	// 解析json日志数据,生成人可读的日志结构文件
	started := false
	var ende *hbtpBean.CmdLogLineJson
	for e := keys.Front(); e != nil; e = e.Next() {
		ls, ok := group[e.Value.(string)]
		if !ok {
			continue
		}
		for _, v := range ls {
			/*if v.Type == hbtpBean.TypeCmdLogLineSys {
				continue
			}*/
			if !started {
				started = true
				bdlog.WriteString(fmt.Sprintf("%s [step] %s(%s) Start", v.Times.Format(common.TimeFmt), job.DisplayName, job.Name))
				bdlog.WriteString("\n")
			}
			bdlog.WriteString(fmt.Sprintf("%s ", v.Times.Format(common.TimeFmt)))
			if v.Type == hbtpBean.TypeCmdLogLineSysrun {
				bdlog.WriteString("+ ")
			} else if v.Type == hbtpBean.TypeCmdLogLineSysend {
				ende = v
			}
			bdlog.WriteString(v.Content)
			bdlog.WriteString("\n")
		}
	}
	if ende != nil {
		bdlog.WriteString(fmt.Sprintf("%s [step] %s(%s) End", ende.Times.Format(common.TimeFmt), job.DisplayName, job.Name))
		bdlog.WriteString("\n")
	}
	return nil
}

// 调用hbtp推送一条日志 调用runnerRPC（实际上是自己）  记录结果
func (c *taskItem) pushLogjson(job *runtime.Job, e *hbtpBean.CmdLogLineJson) error {
	defer func() {
		if err := recover(); err != nil {
			core.LogPnc.Errorf("ExecTask pushLogjson:%+v", err)
			core.LogPnc.Errorf("%s", string(debug.Stack()))
		}
	}()
	e.Times = time.Now()
	core.Log.Debugf("ExecTask pushLogjson %s(%s):%s", e.Type, e.Gid, e.Content)
	hd := hbtp.Mp{
		"buildid": job.BuildId,
		"jobid":   job.Id,
	}
	code, bts, err := runCloud.DoString("JobLogJson", e, hd)
	if err != nil {
		core.Log.Debugf("cmdExec pushLogjson err:%v", err)
		return err
	}
	if code != hbtp.ResStatusOk {
		return fmt.Errorf("req err:%s", string(bts))
	}
	return nil
}
